package com.hsy.spring.boot.rsocket.service.impl;

import java.time.Duration;
import java.util.stream.Stream;

import org.springframework.stereotype.Service;

import com.hsy.spring.boot.rsocket.domain.RSocketReq;
import com.hsy.spring.boot.rsocket.service.RSocketService;

import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/**
 * 服务接口实现
 *
 * @author HuoShengyu
 * @version 1.0
 * @date 2019-11-28 14:46:56
 * @see RSocketService
 */
@Slf4j
@Service
public class RSocketServiceImpl implements RSocketService {
	@Override
	public void add(RSocketReq rsocketReq) {
		log.info("New rsocketReq data: {}", rsocketReq);
	}

	@Override
	public Mono<RSocketReq> getOne(RSocketReq rsocketReq) {
		rsocketReq.setMessage("Mono");
		return Mono.just(rsocketReq);
	}

	@Override
	public Flux<RSocketReq> getAll(RSocketReq rsocketReq) {
		return Flux.fromStream(Stream.generate(() -> {
			rsocketReq.setMessage("Flux");
			return rsocketReq;
		})).log().delayElements(Duration.ofSeconds(1));
	}
}
